package com.ihr360.job.core.step;

import com.ihr360.commons.exception.Ihr360Exception;
import com.ihr360.job.core.BatchStatus;
import com.ihr360.job.core.ChunkListener;
import com.ihr360.job.core.ExitStatus;
import com.ihr360.job.core.StepExecutionListener;
import com.ihr360.job.core.entity.StepContribution;
import com.ihr360.job.core.entity.StepExecution;
import com.ihr360.job.core.item.ExecutionContext;
import com.ihr360.job.core.item.ItemStream;
import com.ihr360.job.core.job.builder.JobBuilderHelper;
import com.ihr360.job.core.listener.CompositeChunkListener;
import com.ihr360.job.core.repeat.RepeatContext;
import com.ihr360.job.core.repeat.RepeatOperations;
import com.ihr360.job.core.repeat.RepeatStatus;
import com.ihr360.job.core.repeat.support.CompositeItemStream;
import com.ihr360.job.core.repeat.support.RepeatTemplate;
import com.ihr360.job.core.scope.context.ChunkContext;
import com.ihr360.job.core.scope.context.StepContextRepeatCallback;
import com.ihr360.job.core.step.tasklet.Tasklet;
import com.ihr360.job.core.step.tasklet.UncheckedTransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

import java.util.concurrent.Semaphore;

/**
 * Simple implementation of executing the step as a call to a {@link Tasklet},
 * possibly repeated, and each call surrounded by a transaction. The structure
 * is therefore that of a loop with transaction boundary inside the loop. The
 * loop is controlled by the step operations (
 * {@link #setStepOperations(RepeatOperations)}).<br>
 * <br>
 * <p>
 * Clients can use interceptors in the step operations to intercept or listen to
 * the iteration on a step-wide basis, for instance to get a callback when the
 * step is complete. Those that want callbacks at the level of an individual
 * tasks, can specify interceptors for the chunk operations.
 *
 * @author Dave Syer
 * @author Lucas Ward
 * @author Ben Hale
 * @author Robert Kasanicky
 * @author Michael Minella
 * @author Will Schipp
 */
@SuppressWarnings("serial")
public class TaskletStep extends AbstractStep {

    protected final Logger logger = LoggerFactory.getLogger(JobBuilderHelper.class.getName());

    private RepeatOperations stepOperations = new RepeatTemplate();

    private CompositeChunkListener chunkListener = new CompositeChunkListener();

    // default to checking current thread for interruption.
    private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();

    private CompositeItemStream stream = new CompositeItemStream();

    private PlatformTransactionManager transactionManager;

    private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {

        @Override
        public boolean rollbackOn(Throwable ex) {
            return true;
        }

    };

    private Tasklet tasklet;

    public static final String TASKLET_TYPE_KEY = "batch.taskletType";

    /**
     * Default constructor.
     */
    public TaskletStep() {
        this(null);
    }

    /**
     * @param name
     */
    public TaskletStep(String name) {
        super(name);
    }

    /*
     * (non-Javadoc)
     *
     * @see
     * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();
        Assert.state(transactionManager != null, "A transaction manager must be provided");
    }

    /**
     * Public setter for the {@link PlatformTransactionManager}.
     *
     * @param transactionManager the transaction manager to set
     */
    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    /**
     * Public setter for the {@link TransactionAttribute}.
     *
     * @param transactionAttribute the {@link TransactionAttribute} to set
     */
    public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
        this.transactionAttribute = transactionAttribute;
    }

    /**
     * Public setter for the {@link Tasklet}.
     *
     * @param tasklet the {@link Tasklet} to set
     */
    public void setTasklet(Tasklet tasklet) {
        this.tasklet = tasklet;
        if (tasklet instanceof StepExecutionListener) {
            registerStepExecutionListener((StepExecutionListener) tasklet);
        }
    }

    /**
     * Register a chunk listener for callbacks at the appropriate stages in a
     * step execution.
     *
     * @param listener a {@link ChunkListener}
     */
    public void registerChunkListener(ChunkListener listener) {
        this.chunkListener.register(listener);
    }

    /**
     * Register each of the objects as listeners.
     *
     * @param listeners an array of listener objects of known types.
     */
    public void setChunkListeners(ChunkListener[] listeners) {
        for (int i = 0; i < listeners.length; i++) {
            registerChunkListener(listeners[i]);
        }
    }

    /**
     * Register each of the streams for callbacks at the appropriate time in the
     * step. The {@link ItemReader} and {@link ItemWriter} are automatically
     * registered, but it doesn't hurt to also register them here. Injected
     * dependencies of the reader and writer are not automatically registered,
     * so if you implement {@link ItemWriter} using delegation to another object
     * which itself is a {@link ItemStream}, you need to register the delegate
     * here.
     *
     * @param streams an array of {@link ItemStream} objects.
     */
    public void setStreams(ItemStream[] streams) {
        for (int i = 0; i < streams.length; i++) {
            registerStream(streams[i]);
        }
    }

    /**
     * Register a single {@link ItemStream} for callbacks to the stream
     * interface.
     *
     * @param stream
     */
    public void registerStream(ItemStream stream) {
        this.stream.register(stream);
    }

    /**
     * The {@link RepeatOperations} to use for the outer loop of the batch
     * processing. Should be set up by the caller through a factory. Defaults to
     * a plain {@link RepeatTemplate}.
     *
     * @param stepOperations a {@link RepeatOperations} instance.
     */
    public void setStepOperations(RepeatOperations stepOperations) {
        this.stepOperations = stepOperations;
    }

    /**
     * Setter for the {@link StepInterruptionPolicy}. The policy is used to
     * check whether an external request has been made to interrupt the job
     * execution.
     *
     * @param interruptionPolicy a {@link StepInterruptionPolicy}
     */
    public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
        this.interruptionPolicy = interruptionPolicy;
    }

    /**
     * Process the step and update its context so that progress can be monitored
     * by the caller. The step is broken down into chunks, each one executing in
     * a transaction. The step and its execution and execution context are all
     * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
     * used to store the result. Various reporting information are also added to
     * the current context governing the step execution, which would normally be
     * available to the caller through the step's {@link ExecutionContext}.<br>
     *
     * @throws JobInterruptedException if the step or a chunk is interrupted
     * @throws RuntimeException        if there is an exception during a chunk
     *                                 execution
     */
    @Override
    protected void doExecute(StepExecution stepExecution) throws Exception {
        stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
        stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

        stream.update(stepExecution.getExecutionContext());
        getJobRepository().updateExecutionContext(stepExecution);

        // Shared semaphore per step execution, so other step executions can run
        // in parallel without needing the lock
        final Semaphore semaphore = createSemaphore();

        stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

            @Override
            public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
                    throws Exception {

                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

                // Before starting a new transaction, check for
                // interruption.
                interruptionPolicy.checkInterrupted(stepExecution);

                RepeatStatus result;
                try {
                    result = new TransactionTemplate(transactionManager, transactionAttribute)
                            .execute(new ChunkTransactionCallback(chunkContext, semaphore));
                } catch (UncheckedTransactionException e) {
                    // Allow checked exceptions to be thrown inside callback
                    throw (Exception) e.getCause();
                }

                chunkListener.afterChunk(chunkContext);

                // Check for interruption after transaction as well, so that
                // the interrupted exception is correctly propagated up to
                // caller
                interruptionPolicy.checkInterrupted(stepExecution);

                return result;
            }

        });

    }

    /**
     * Extension point mainly for test purposes so that the behaviour of the
     * lock can be manipulated to simulate various pathologies.
     *
     * @return a semaphore for locking access to the JobRepository
     */
    protected Semaphore createSemaphore() {
        return new Semaphore(1);
    }

    @Override
    protected void close(ExecutionContext ctx) throws Exception {
        stream.close();
    }

    @Override
    protected void open(ExecutionContext ctx) throws Exception {
        stream.open(ctx);
    }

    /**
     * retrieve the tasklet - helper method for JobOperator
     *
     * @return the {@link Tasklet} instance being executed within this step
     */
    public Tasklet getTasklet() {
        return tasklet;
    }

    /**
     * A callback for the transactional work inside a chunk. Also detects
     * failures in the transaction commit and rollback, only panicking if the
     * transaction status is unknown (i.e. if a commit failure leads to a clean
     * rollback then we assume the state is consistent).
     *
     * @author Dave Syer
     */
    private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback<RepeatStatus> {

        private final StepExecution stepExecution;

        private final ChunkContext chunkContext;

        private boolean rolledBack = false;

        private boolean stepExecutionUpdated = false;

        private StepExecution oldVersion;

        private boolean locked = false;

        private final Semaphore semaphore;

        public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
            this.chunkContext = chunkContext;
            this.stepExecution = chunkContext.getStepContext().getStepExecution();
            this.semaphore = semaphore;
        }

        @Override
        public void afterCompletion(int status) {
            try {
                if (status != TransactionSynchronization.STATUS_COMMITTED) {
                    if (stepExecutionUpdated) {
                        // Wah! the commit failed. We need to rescue the step
                        // execution data.
                        logger.info("Commit failed while step execution data was already updated. "
                                + "Reverting to old version.");
                        copy(oldVersion, stepExecution);
                        if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
                            rollback(stepExecution);
                        }
                    }
                    chunkListener.afterChunkError(chunkContext);
                }

                if (status == TransactionSynchronization.STATUS_UNKNOWN) {
                    logger.error("Rolling back with transaction in unknown state");
                    rollback(stepExecution);
                    stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
                    stepExecution.setTerminateOnly();
                }
            } finally {
                // Only release the lock if we acquired it, and release as late
                // as possible
                if (locked) {
                    semaphore.release();
                }

                locked = false;
            }
        }

        @Override
        public RepeatStatus doInTransaction(TransactionStatus status) {
            TransactionSynchronizationManager.registerSynchronization(this);

            RepeatStatus result = RepeatStatus.CONTINUABLE;

            StepContribution contribution = stepExecution.createStepContribution();

            chunkListener.beforeChunk(chunkContext);

            // In case we need to push it back to its old value
            // after a commit fails...
            oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
            copy(stepExecution, oldVersion);

            try {

                try {
                    try {
                        result = tasklet.execute(contribution, chunkContext);
                        if (result == null) {
                            result = RepeatStatus.FINISHED;
                        }
                    }catch (Ihr360Exception e){
                        ExitStatus newStatus = new ExitStatus(ExitStatus.FAILED.getExitCode(), e.getCause() != null ? e.getCause().toString() : "", e.getMessage());
                        stepExecution.setExitStatus(newStatus);
                        stepExecution.setStatus(BatchStatus.STOPPED);
                        contribution.setExitStatus(newStatus);
                        if (transactionAttribute.rollbackOn(e)) {
                            chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
                            throw e;
                        }
                    }catch (Exception e) {
                        ExitStatus exitStatus = stepExecution.getExitStatus();
                        ExitStatus newStatus = null;
                        if (exitStatus != null) {
                            newStatus = new ExitStatus(exitStatus.getExitCode(), e.getCause() != null ? e.getCause().toString() : "", e.getMessage());
                        } else {
                            newStatus = new ExitStatus(ExitStatus.FAILED.getExitCode(), e.getCause() != null ? e.getCause().toString() : "", e.getMessage());
                        }

                        stepExecution.setExitStatus(newStatus);
                        if (transactionAttribute.rollbackOn(e)) {
                            chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
                            throw e;
                        }
                    }
                } finally {

                    // If the step operations are asynchronous then we need
                    // to synchronize changes to the step execution (at a
                    // minimum). Take the lock *before* changing the step
                    // execution.
                    try {
                        semaphore.acquire();
                        locked = true;
                    } catch (InterruptedException e) {
                        logger.error("Thread interrupted while locking for repository update");
                        stepExecution.setStatus(BatchStatus.STOPPED);
                        stepExecution.setTerminateOnly();
                        Thread.currentThread().interrupt();
                    }

                    // Apply the contribution to the step
                    // even if unsuccessful
                    if (logger.isDebugEnabled()) {
                        logger.debug("Applying contribution: " + contribution);
                    }
                    stepExecution.apply(contribution);

                }

                stepExecutionUpdated = true;

                stream.update(stepExecution.getExecutionContext());

                try {
                    // Going to attempt a commit. If it fails this flag will
                    // stay false and we can use that later.
                    getJobRepository().updateExecutionContext(stepExecution);
                    stepExecution.incrementCommitCount();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Saving step execution before commit: " + stepExecution);
                    }
                    getJobRepository().update(stepExecution);
                } catch (Exception e) {
                    // If we get to here there was a problem saving the step
                    // execution and we have to fail.
                    String msg = "JobRepository failure forcing rollback";
                    logger.error(msg, e);
                    throw new FatalStepExecutionException(msg, e);
                }
            } catch (Error e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                throw e;
            } catch (RuntimeException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                throw e;
            } catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                // Allow checked exceptions
                throw new UncheckedTransactionException(e);
            }

            return result;

        }

        private void rollback(StepExecution stepExecution) {
            if (!rolledBack) {
                stepExecution.incrementRollbackCount();
                rolledBack = true;
            }
        }

        private void copy(final StepExecution source, final StepExecution target) {
            target.setVersion(source.getVersion());
            target.setWriteCount(source.getWriteCount());
            target.setFilterCount(source.getFilterCount());
            target.setCommitCount(source.getCommitCount());
            target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
        }

    }
}